package com.uxsino.reactorq.clientserver;

import java.io.IOException;
import java.util.function.Function;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.BlobMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.uxsino.reactorq.commons.IWithAttachmentStream;
import com.uxsino.reactorq.commons.Response;

/**
 * synchronized call/response server for jms 
 * 
 *
 * @param <ArgType>
 */
public class JMSSyncServer<ArgType> implements javax.jms.MessageListener {
    private static Logger logger = LoggerFactory.getLogger(JMSSyncServer.class);

    private static final int TIME_TO_LIVE = 60 * 60 * 1000;

    private QueueConnection connection;

    private QueueSession session;

    private javax.jms.MessageConsumer receiver;

    private javax.jms.MessageProducer replier;

    private javax.jms.Queue queue;

    private String queueName;

    private ObjectMapper mapper = new ObjectMapper();

    private Function<ArgType, ?> handler;

    private Class<ArgType> eventType;

    public JMSSyncServer(QueueConnection connection, String queueName, String receiverId, Class<ArgType> cls,
        Function<ArgType, ?> handler) throws JMSException {
        this.eventType = cls;
        this.handler = handler;
        this.connection = connection;
        this.queueName = queueName;

        if (connection != null) {
            createReceiver(receiverId);
        }

    }

    private void createReceiver(String receiverId) throws JMSException {
        session = connection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE);
        queue = session.createQueue(queueName);
        if (receiverId != null) {
            String selector = "receiverId = '" + receiverId + "'";
            // String selector = "receiverId IS NULL";// OR receiverId = '" + receiverId + "'";
            receiver = session.createConsumer(queue, selector);
        } else {
            receiver = session.createConsumer(queue);
        }
        receiver.setMessageListener(this);
        replier = session.createProducer(null);
        replier.setTimeToLive(TIME_TO_LIVE);
    }

    public JMSSyncServer<ArgType> connect() throws JMSException {
        if (connection != null)
            connection.start();
        return this;
    }

    public String getQueueName() {
        return queueName;
    }

    private String getMessageText(Message msg) throws JMSException {
        if (msg instanceof TextMessage) {
            return ((TextMessage) msg).getText();
        }
        return msg.getStringProperty("text");
    }

    /***
     * 
     */
    @Override
    public void onMessage(Message message) {
        try {

            try {
                Response response = new Response();

                if (handler != null) {
                    try {
                        ArgType ev = mapper.readValue(getMessageText(message), eventType);

                        if (ev instanceof IWithAttachmentStream && message instanceof BlobMessage) {
                            try {
                                ((IWithAttachmentStream) ev).setAttachement(((BlobMessage) message).getInputStream());
                            } catch (IOException | JMSException e) {
                                logger.error("error getting attachment");
                            }
                        }
                        Object r = handler.apply(ev);
                        response.j = mapper.writeValueAsString(r);
                        response.status = Response.STATUS_SUCCESS;
                    } catch (Exception e) {
                        response.status = Response.STATUS_ERROR;
                        response.j = e.getMessage();
                    }
                }
                // Check for a ReplyTo Queue
                javax.jms.Queue replyQueue = (javax.jms.Queue) message.getJMSReplyTo();

                if (replyQueue != null) {
                    javax.jms.TextMessage reply = session.createTextMessage();
                    try {
                        reply.setText(mapper.writeValueAsString(response));
                    } catch (JsonProcessingException e) {
                        logger.error("", e);
                        response.status = Response.STATUS_ERROR;
                        response.j = e.getMessage();
                        try {
                            reply.setText(mapper.writeValueAsString(response));
                        } catch (Exception ex) {
                            // nothing we can do here
                            return;
                        }
                    }

                    reply.setJMSCorrelationID(message.getJMSMessageID());
                    replier.send(replyQueue, reply);
                    session.commit();
                }
            } catch (javax.jms.JMSException jmse) {
                logger.error("", jmse);
            }
        } catch (java.lang.RuntimeException rte) {
            logger.error("", rte);
        }

    }

}
